feat(BA-6155): add metadata-driven chunk-store upload session engine#11766
feat(BA-6155): add metadata-driven chunk-store upload session engine#11766jopemachine wants to merge 1 commit into
Conversation
2a9d0eb to
193aece
Compare
764ffad to
89e52ef
Compare
89e52ef to
4499f92
Compare
4499f92 to
b84c12f
Compare
c0c0db8 to
4d1cb89
Compare
4d1cb89 to
5c675ef
Compare
9a8919e to
9225be5
Compare
7bbd8c4 to
f337ca3
Compare
52ce242 to
c914cd9
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new Valkey-backed, distributed-lock-guarded TUS upload session engine (TusUploadSession) for storage-proxy, where upload session metadata is stored in Valkey and chunk payloads are stored as per-offset files on the shared filesystem to support safe multi-replica uploads.
Changes:
- Add
TusUploadSessionengine, supporting lock-free temp chunk writes, locked metadata commits, final assembly, and cleanup. - Introduce a dedicated
ValkeyTusClient+ lock factory wiring in storage-proxy startup and context. - Add unit/integration-style tests for session state logic and the end-to-end engine behavior (including concurrent commit scenarios).
Reviewed changes
Copilot reviewed 21 out of 22 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/ai/backend/storage/services/upload/tus_session.py |
Core upload session engine: init/read state, temp chunk writes, commit, assemble, cleanup. |
src/ai/backend/storage/services/upload/types.py |
Session state schema + result/args dataclasses used by the engine. |
src/ai/backend/storage/services/upload/utils.py |
Filesystem path helpers for temp/committed/staging files and best-effort unlink. |
src/ai/backend/storage/services/upload/lock.py |
Distributed lock constants and RedisLock-backed lock factory creator. |
src/ai/backend/storage/services/upload/__init__.py |
Package marker for upload services. |
src/ai/backend/common/clients/valkey_client/valkey_tus/client.py |
New Glide-based Valkey client for storing session state with TTL + resilience policies. |
src/ai/backend/common/clients/valkey_client/valkey_tus/__init__.py |
Exports ValkeyTusClient. |
src/ai/backend/storage/server.py |
Wire Valkey TUS client + dedicated Redis-lock connection into storage-proxy root context. |
src/ai/backend/storage/context.py |
Add valkey_tus_client and tus_lock_factory to RootContext. |
src/ai/backend/storage/migration.py |
Populate new RootContext fields in migration context (currently with None + type-ignores). |
src/ai/backend/storage/errors/upload.py |
Add upload-session specific errors (ChunkConflictError, UploadSessionCorruptedError). |
src/ai/backend/storage/errors/__init__.py |
Export newly added upload errors. |
src/ai/backend/common/lock.py |
Add DistributedLockFactory protocol type for injection/factory typing. |
src/ai/backend/common/types.py |
Introduce TusSessionId NewType. |
src/ai/backend/common/defs/__init__.py |
Add REDIS_TUS_DB DB id constant. |
src/ai/backend/common/metrics/metric.py |
Add LayerType.VALKEY_TUS metric layer. |
tests/unit/storage/services/upload/test_tus_upload_session.py |
End-to-end tests for TusUploadSession, including concurrent commit scenarios. |
tests/unit/storage/services/upload/test_tus_session.py |
Unit tests for TusSessionState prefix-offset computation and offset lookup. |
tests/unit/storage/services/upload/conftest.py |
Fixtures for Valkey client and Redis lock factory using a redis container. |
tests/unit/storage/services/upload/BUILD |
Pants test target definitions for the new test package. |
tests/unit/storage/services/upload/__init__.py |
Test package marker. |
changes/11766.feature.md |
Towncrier entry documenting the new upload engine. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
c914cd9 to
90df169
Compare
fregataa
left a comment
There was a problem hiding this comment.
- Does a client of tus-session need to call
write_temp_chunk(),commit_chunk()andassemble()separately? - Are all chunk saved in valkey till complete upload?
- Is
commit_chunk()called inwrite_temp_chunk()? The PR description and codes are different
| async with self._lock(): | ||
| if await self._load_state() is not None: | ||
| return | ||
| await asyncio.to_thread(self._delete_chunk_files) |
There was a problem hiding this comment.
If we process everything on a single thread, storage proxy requests may be blocked when storage operations take a long time, especially with NFS-backed storage.
For consistency, this changes the implementation to use aiofiles.
| self._session_id = args.session_id | ||
| self._total_size = args.total_size | ||
| self._valkey = args.valkey_client | ||
| self._lock_factory = args.lock_factory |
There was a problem hiding this comment.
Why is lock_factory passed as a parameter?
There was a problem hiding this comment.
Because metadata updates in Redis must be protected by a lock to prevent concurrent reads and writes from multiple Storage Proxy instances.
|
90df169 to
3a9a5c4
Compare
Introduce the concurrency-safe upload session engine that replaces the
single-file-append TUS model. Session metadata is the source of truth in
Valkey, guarded by a per-session lock (SET NX + token-compare Lua release);
only the chunk payload bytes live on the shared filesystem as per-offset
chunk_<offset>.dat files. Coordination across multiple Storage Proxy replicas
happens entirely through Valkey, so there is no dependency on filesystem lock
semantics (fcntl.flock) or NFS attribute-cache coherence; chunk payloads are
content-addressed by (offset, sha256) and idempotent, needing no coordination.
Contents:
- common/clients/valkey_client/valkey_tus: ValkeyTusClient — per-session
state get/set (with TTL) + a per-session lock; reuses the Glide-based
AbstractValkeyClient like the other valkey clients.
- common/defs: REDIS_TUS_DB; common/metrics: VALKEY_TUS layer.
- errors/upload.py: ChunkConflictError (409), UploadSessionCorruptedError (500)
- services/upload/tus_session.py:
- ChunkRecord / SessionState (BackendAISchema; committed_offset as the
largest contiguous prefix, missing_ranges, progress_percent) / ChunkAcceptance
- UploadStatus StrEnum
- TusUploadSession (ensure_initialized, read_state, write_temp_chunk,
commit_chunk — idempotent dup / 409 conflict / no-op when completed,
assemble, cleanup) taking a TusUploadSessionArgs that carries the
ValkeyTusClient.
- Stale sessions auto-expire via the Valkey TTL (no separate GC sweep).
- Storage RootContext now provisions a ValkeyTusClient (server.py bootstrap).
- Integration tests drive a real Valkey (redis container) + tmp_path chunks.
Resolves BA-6154. Resolves BA-6155.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
3a9a5c4 to
d7aba6a
Compare
📚 Stacked PRs
This PR is part of a 5-PR stack implementing BA-3974 (epic: BA-6153). Merge in order:
feat(BA-6155): add Valkey-backed chunk-store upload session engine← you are herefix(BA-6156): rewire TUS PATCH/HEAD to chunk-based store(actual user-visible fix)test(BA-6157): add multi-proxy NFS race regression testfeat(BA-6158): support TUS Checksum extensionfeat(BA-6159): add /upload/status endpoint + progress headersSummary
Introduces the concurrency-safe TUS upload session engine. Per-session metadata
is the source of truth in Valkey (keyed by session id) and is guarded by a
per-session distributed lock produced by an injected
DistributedLockFactory.Chunk payload bytes live as individual files under
chunks/chunk_<offset>.daton the shared filesystem; they are content-addressed by
(offset, sha256)andidempotent, so the heavy write path needs no coordination at all. Only the small
metadata read-modify-write window is serialized — and across Storage Proxy
replicas, since the lock is distributed.
Architecture
The handler (PR #11767) only adapts the request body; all chunk file I/O and
metadata management lives in the engine (this PR).
flowchart TB classDef pr66 fill:#e6f7ff,stroke:#1890ff,color:#000 classDef pr67 fill:#fff7e6,stroke:#fa8c16,color:#000 classDef redis fill:#fff1f0,stroke:#cf1322,color:#000 classDef disk fill:#f6ffed,stroke:#52c41a,color:#000 subgraph L1["PATCH handler (PR #11767)"] REQ["request.content"]:::pr67 ADP["TusChunkUploadStreamReader"]:::pr67 HND["tus_upload_part"]:::pr67 REQ --> ADP --> HND end subgraph L2["Upload engine (PR #11766)"] WTC["write_temp_chunk"]:::pr66 CC["commit_chunk"]:::pr66 FIN["assemble + cleanup"]:::pr66 end subgraph L3R["Valkey · source of truth"] STATE["tus.upload.session:id"]:::redis LOCK["tus.upload.lock:id"]:::redis end subgraph L3F["shared FS · chunk payloads"] TMP["chunk_off.tok.tmp"]:::disk DAT["chunk_off.dat"]:::disk OUT["assembled file"]:::disk end HND -->|"1. drain body"| WTC HND -->|"2. promote temp"| CC HND -->|"3. on final commit"| FIN WTC -->|"lock-free write"| TMP CC -->|"acquire"| LOCK CC -->|"get / set"| STATE CC -->|"atomic rename"| DAT FIN -->|"concat in order"| OUT FIN -->|"mark COMPLETED"| STATE FIN -->|"unlink chunks"| DATData model
classDiagram class TusUploadSession { +ensure_initialized() +read_state() TusSessionState +open_temp_chunk(offset) Path +write_temp_chunk(offset, reader) WrittenChunk +commit_chunk(offset, path, length, sha256) ChunkCommitResult +assemble(target) +cleanup() } class TusSessionState { +session_id +total_size +committed_chunks : ChunkMetadata[] +status : UploadStatus +committed_offset +find_at_offset(offset) +append_chunk(chunk) } class ChunkMetadata { +offset +length +sha256 } class WrittenChunk { +path +length +sha256 } class ChunkCommitResult { +state : TusSessionState +committed : bool +is_final_commit : bool } class TusUploadSessionArgs { +session_dir +session_id +total_size +valkey_client : ValkeyTusClient +lock_factory : DistributedLockFactory } TusSessionState o-- ChunkMetadata TusUploadSession ..> TusSessionState TusUploadSession ..> ChunkCommitResult TusUploadSession ..> WrittenChunk TusUploadSession ..> TusUploadSessionArgsOne PATCH lifecycle
The expensive body write is lock-free; only the short metadata read-modify-write
is serialized via the distributed lock, so concurrent replicas never corrupt
each other.
sequenceDiagram autonumber participant H as tus_upload_part participant S as TusUploadSession participant V as Valkey participant D as shared FS H->>S: ensure_initialized() S->>V: acquire lock + SET state if missing H->>S: write_temp_chunk(offset, reader) loop async for chunk in reader.read() S->>D: write chunk_off.tok.tmp (lock-free) + sha256 end S-->>H: WrittenChunk(path, length, sha256) H->>S: commit_chunk(offset, path, length, sha256) Note over S,V: distributed lock window (short) alt status == COMPLETED S-->>H: no-op (late duplicate from another replica) else duplicate (same offset, length, sha256) S-->>H: committed=False (idempotent) else conflict (same offset, different content) S-->>H: ChunkConflictError 409 else new chunk S->>D: atomic rename tmp to chunk_off.dat S->>V: update TusSessionState with new record S-->>H: ChunkCommitResult(is_final_commit?) end opt is_final_commit H->>S: assemble(target) S->>D: concat chunks in offset order S->>V: mark status=COMPLETED H->>S: cleanup() S->>D: unlink chunk files (state kept for TTL) end H-->>H: 204 No ContentLocking & HA semantics
Session state is NOT held in memory and NOT on the shared filesystem.
TusUploadSessionis stateless in-process — every operation reads/writes theTusSessionState in Valkey, and chunk payload files are content-addressed.
This makes it safe for the production topology of multiple storage-proxy
processes/replicas sharing an NFS mount: no in-process state to synchronize,
no dependency on filesystem lock semantics, no dependency on NFS attribute-cache
coherence.
How replicas coordinate:
DistributedLockFactory)serializes the metadata read-modify-write window. The factory pattern mirrors
the manager's
DistributedLockFactory— the lock backend lives inRootContext.tus_lock_factoryand is injected into the engine.Path.replace) promotes a temp chunk file(
chunk_<offset>.<token>.tmp) to its canonical name (chunk_<offset>.dat).Readers see old-or-new, never a partial file.
no-ops, mismatched content surfaces as a 409
ChunkConflictError.session affinity, and still resolves to the same Valkey key and same chunk
file path.
Stale or abandoned sessions are reclaimed by the Valkey state's TTL (24h), so
there is no separate GC sweep.
Resolves BA-6154, BA-6155.